Hue创建Oozie工作流、Oozie的java客户端api

您所在的位置:网站首页 oozie 界面 Hue创建Oozie工作流、Oozie的java客户端api

Hue创建Oozie工作流、Oozie的java客户端api

2024-07-05 18:33| 来源: 网络整理| 查看: 265

1、Hue创建Oozie工作流 (1)local 模式

登录Hue,创建Oozie工作流,点击【Workspace】按钮,新页面打开,上传jar包至lib目录中. 在这里插入图片描述 进入WorkSpace,上传JAR包至lib目录: 在这里插入图片描述 添加Spark2任务: 在这里插入图片描述 选择jar包所在目录和jar包名称: 在这里插入图片描述 填写MainClass及添加JAR包文件: 在这里插入图片描述 设置使用Spark2,否则默认使用的Spark1: 在这里插入图片描述 保存Oozie,然后点击提交:

在这里插入图片描述 成功 在这里插入图片描述

(2)yarn client 模式

进入 Workspace,进入 lib 目录,并上传 jar 包,拖拽 Spark Program,填写业务主类名称和 配置参数: 在这里插入图片描述 点击小齿轮,查看其他参数: 在这里插入图片描述保存配置,提交运行: 在这里插入图片描述 其中Hue自动生成的workflow配置文件内容如下:

Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] ${jobTracker} ${nameNode} oozie.action.sharelib.for.spark spark2 mapred.job.queue.name default yarn client YarnSparkPi org.apache.spark.examples.SparkPi spark-examples_2.11-2.2.0.jar --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 --conf spark.yarn.historyServer.address=http://bigdata-cdh01:18080 -- conf spark.eventLog.enabled=true --conf spark.eventLog.dir=hdfs://bigdata-cdh01:8020/spark/eventLogs --conf spark.yarn.jars=hdfs://bigdata-cdh01:8020/spark/jars/* /user/root/oozie_works/examples/apps/hue-oozie-1646279075.31/lib/spark-examples_2.11-2.2.0.jar#spark-examples_2.11-2.2.0.jar (3)yarn cluster 模式

按照上述yarn client模式使用hue构建workflow,设置应用运行为yarn-cluster模式,提交运 行。 在这里插入图片描述 运行成功截图: 在这里插入图片描述

(4)Schedule 调度

选择进入Scheduler页面,基于Workflow构建调度任务,可以设置时间调度。 在这里插入图片描述 设置名称和描述,选择Workflow及定时执行表达式(注意时区选择): 在这里插入图片描述

2、Oozie Java Client API

Apache Oozie是一个工作流调度系统,具有如下特性优势:

1)、工作流的调度是DAG(有向无环图)-Directed Acyclical Graphs 2)、Coordinator job可以通过时间和数据集的可用性触发 3)、集成了Hadoop生态系统的其它任务,如mr,pig,hive,sqoop,distcp 4)、可扩展:一个Oozie就是一个mr程序,但是仅仅是map,没有reduce 5)、可靠性:任务失败后的重试

在这里插入图片描述

(1) Workflow Submit

将SparkPi圆周率程序提交到YARN上以cluster DeployMode运行,相关配置文件内容如下:

package com.yyds.tags.oozie; public class OozieConstant { static String HDFSROOTPATH = "hdfs://192.168.42.7:8020"; static String OOZIE_URL = "http://192.168.42.7:11000/oozie/"; static String jobTracker = "192.168.42.7:8032"; } package com.yyds.tags.oozie; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.OozieClientException; import org.apache.oozie.client.WorkflowJob; import java.util.Properties; public class OozieWorkflowSubmit { public static void main(String[] args) throws OozieClientException, InterruptedException { String OOZIE_URL = OozieConstant.OOZIE_URL; // TODO: 1. 构建OozieClient 客户端实例对象 OozieClient oozieClient = new OozieClient(OOZIE_URL); // TODO: 2. 设置Workflow相关配置参数值 Properties jobConf = oozieClient.createConfiguration(); // 2.1. 系统参数设置 jobConf.setProperty("oozie.use.system.libpath", "true"); jobConf.setProperty("user.name", "root"); jobConf.setProperty("oozie.libpath", OozieConstant.HDFSROOTPATH + "/user/root/share/lib/lib_20190723215106/spark2"); // 2.2. 必要参数信息 jobConf.setProperty("nameNode", OozieConstant.HDFSROOTPATH); jobConf.setProperty("jobTracker", OozieConstant.jobTracker); jobConf.setProperty("queueName", "default"); // 2.3. 应用提交运行yarn参数 jobConf.setProperty("master", "yarn"); jobConf.setProperty("mode", "client"); jobConf.setProperty("sparkOptions", " --driver-memory 512m " + "--executor-memory 512m " + "--num-executors 1 " + "--executor-cores 1 " + "--conf spark.yarn.historyServer.address=http://192.168.42.7:18080 " + "--conf spark.eventLog.enabled=true " + "--conf spark.eventLog.dir=hdfs://192.168.42.7:8020/spark/eventLogs " + "--conf spark.yarn.jars=hdfs://192.168.42.7:8020/spark/jars/*"); jobConf.setProperty("mainClass", "org.apache.spark.examples.SparkPi"); jobConf.setProperty("appName", "SparkExamplePi"); jobConf.setProperty("jarPath", OozieConstant.HDFSROOTPATH + "/user/root/oozie_works/cron-yarn_pi/lib/spark-examples_2.11-2.2.0.jar"); jobConf.setProperty("appParam", "10"); // 2.4. Oozie Workflow 参数 jobConf.setProperty(OozieClient.APP_PATH, OozieConstant.HDFSROOTPATH + "/user/root/oozie_works/cron_yarn_pi/workflow.xml"); // TODO: 3. 提交执行Oozie Workflow,返回应用提交JobID String jobId = oozieClient.run(jobConf); System.out.println("JobId = " + jobId); // TODO: 4. 依据JobID获取转态信息 while (oozieClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) { System.out.println("Workflow job running ..."); Thread.sleep(10 * 1000); } System.out.println("Workflow job completed ..."); } }

注意,将Spark Application程序依赖包及workflow.xml文件上传到HDFS目录中

其中workflow文件内容为:

Action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] ${jobTracker} ${nameNode} oozie.action.sharelib.for.spark spark2 mapred.job.queue.name default yarn client YarnSparkPi org.apache.spark.examples.SparkPi spark-examples_2.11-2.2.0.jar --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 --conf spark.yarn.historyServer.address=http://bigdata-cdh01:18080 -- conf spark.eventLog.enabled=true --conf spark.eventLog.dir=hdfs://bigdata-cdh01:8020/spark/eventLogs --conf spark.yarn.jars=hdfs://bigdata-cdh01:8020/spark/jars/* /user/root/oozie_works/examples/apps/hue-oozie-1646279075.31/lib/spark-examples_2.11-2.2.0.jar#spark-examples_2.11-2.2.0.jar (2) Coordinator Submit

修改上述代码,添加定时调度时间设置及执行Coordinator配置文件,提交执行即可,具体 如下:

package com.yyds.tags.oozie; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.OozieClientException; import org.apache.oozie.client.WorkflowJob; import java.util.Properties; public class OozieCoordinatorSubmit { public static void main(String[] args) throws OozieClientException, InterruptedException { String OOZIE_URL = OozieConstant.OOZIE_URL; // TODO: 1. 构建OozieClient 客户端实例对象 OozieClient oozieClient = new OozieClient(OOZIE_URL); // TODO: 2. 设置Workflow相关配置参数值 Properties jobConf = oozieClient.createConfiguration(); // 2.1. 系统参数设置 jobConf.setProperty("oozie.use.system.libpath", "true"); jobConf.setProperty("user.name", "root"); jobConf.setProperty("oozie.libpath", OozieConstant.HDFSROOTPATH + "/user/root/share/lib/lib_20190723215106/spark2"); // 2.2. 必要参数信息 jobConf.setProperty("nameNode", OozieConstant.HDFSROOTPATH); jobConf.setProperty("jobTracker", "192.168.42.7:8032"); jobConf.setProperty("queueName", "default"); // 2.3. 应用提交运行yarn参数 jobConf.setProperty("master", "yarn"); jobConf.setProperty("mode", "client"); jobConf.setProperty("sparkOptions", " --driver-memory 512m " + "--executor-memory 512m " + "--num-executors 1 " + "--executor-cores 1 " + "--conf spark.yarn.historyServer.address=http://192.168.42.7:18080 " + "--conf spark.eventLog.enabled=true " + "--conf spark.eventLog.dir=hdfs://192.168.42.7:8020/spark/eventLogs " + "--conf spark.yarn.jars=hdfs://192.168.42.7:8020/spark/jars/*"); jobConf.setProperty("mainClass", "org.apache.spark.examples.SparkPi"); jobConf.setProperty("appName", "SparkExamplePi"); jobConf.setProperty("jarPath", OozieConstant.HDFSROOTPATH + "/user/root/oozie_works/cron-yarn_pi/lib/spark-examples_2.11-2.2.0.jar"); jobConf.setProperty("appParam", "10"); // 2.4. 定时任务设置 jobConf.setProperty("start", "2022-03-05T17:42Z"); jobConf.setProperty("freq", "0/3 * * * *"); jobConf.setProperty("end", "2022-06-01T17:50Z"); // 2.5. Oozie Workflow 参数 jobConf.setProperty("appPath", OozieConstant.HDFSROOTPATH + "/user/root/oozie_works/cron_yarn_pi"); jobConf.setProperty(OozieClient.COORDINATOR_APP_PATH, OozieConstant.HDFSROOTPATH + "/user/root/oozie_works/cron_yarn_pi/coordinator.xml"); // TODO: 3. 提交执行Oozie Workflow,返回应用提交JobID String jobId = oozieClient.run(jobConf); System.out.println("JobId = " + jobId); // TODO: 4. 依据JobID获取转态信息 while (oozieClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) { System.out.println("Workflow job running ..."); Thread.sleep(10 * 1000); } System.out.println("Workflow job completed ..."); } }

Coordinator配置文件:

FIFO ${wf_application_path} oozie.use.system.libpath True start_date ${start_date} end_date ${end_date}


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3